{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "# AIStore Python SDK ETL Tutorial"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e616503b",
   "metadata": {},
   "source": [
    "### Set up constants and initialize the client\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "45d2c741",
   "metadata": {},
   "outputs": [],
   "source": [
    "from aistore import Client\n",
    "from aistore.sdk.etl_templates import MD5\n",
    "from aistore.sdk.etl.webserver.fastapi_server import FastAPIServer\n",
    "import hashlib\n",
    "from itertools import cycle\n",
    "\n",
    "BUCKET_NAME = \"bucket-demo\"\n",
    "SPEC_ETL_NAME = \"etl-spec-demo\"\n",
    "CODE_ETL_NAME = \"etl-code-demo\"\n",
    "\n",
    "# Note: AIS-ETLs require Kubernetes.\n",
    "client = Client(\"http://192.168.49.2:8080\")\n",
    "client.bucket(bck_name=BUCKET_NAME).create(exist_ok=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "### We can initialize ETLs with either [code](https://aiatscale.org/docs/etl#init-code-request) or [spec](https://aiatscale.org/docs/etl#init-spec-request)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "#### Initialize an ETL with code:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Defining ETL transformation code\n",
    "import hashlib\n",
    "from aistore.sdk.etl.webserver.fastapi_server import FastAPIServer\n",
    "\n",
    "md5_code_etl = client.etl(etl_name=CODE_ETL_NAME)\n",
    "\n",
    "# Initializing ETL with class using decorator pattern\n",
    "@md5_code_etl.init_class()\n",
    "class MD5TransformETL(FastAPIServer):\n",
    "    def transform(self, data: bytes, _path: str, _etl_args: str) -> bytes:\n",
    "        \"\"\"Transform input bytes to MD5 hash\"\"\"\n",
    "        md5 = hashlib.md5()\n",
    "        md5.update(data)\n",
    "        return md5.hexdigest().encode()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "#### Initialize ETL with spec"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Use the provided template and substitute in the communication type\n",
    "template = MD5.format(communication_type=\"hpush\")\n",
    "md5_spec_etl = client.etl(etl_name=SPEC_ETL_NAME)\n",
    "md5_spec_etl.init_spec(template=template)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "Refer to more ETL templates [here](https://github.com/NVIDIA/aistore/blob/main/python/aistore/sdk/etl)."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "888ad4ee",
   "metadata": {},
   "source": [
    "### List ETLs\n",
    "Once initialized, we can verify the ETLs are running:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4f001731",
   "metadata": {},
   "outputs": [],
   "source": [
    "client.cluster().list_running_etls()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2a0d8e79",
   "metadata": {},
   "source": [
    "### View ETLs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "030e8611",
   "metadata": {},
   "outputs": [],
   "source": [
    "md5_code_etl.view()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "80903c9e",
   "metadata": {},
   "outputs": [],
   "source": [
    "md5_spec_etl.view()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f7813f98",
   "metadata": {},
   "source": [
    "## Get an object with ETL transformation applied"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "### First, create some objects to transform"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "aeb15852",
   "metadata": {},
   "outputs": [],
   "source": [
    "import random\n",
    "import string\n",
    "import tempfile\n",
    "\n",
    "\n",
    "def create_and_put_object(\n",
    "    client: Client,\n",
    "    bck_name: str,\n",
    "    obj_name: str,\n",
    "    provider: str = \"ais\",\n",
    "    obj_size: int = 0,\n",
    "):\n",
    "    obj_size = obj_size if obj_size else random.randrange(10, 20)\n",
    "    obj_body = \"\".join(random.choices(string.ascii_letters, k=obj_size))\n",
    "    content = obj_body.encode(\"utf-8\")\n",
    "    with tempfile.NamedTemporaryFile() as file:\n",
    "        file.write(content)\n",
    "        file.flush()\n",
    "        client.bucket(bck_name, provider=provider).object(obj_name).get_writer().put_file(file.name)\n",
    "    return content"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "56256969",
   "metadata": {},
   "outputs": [],
   "source": [
    "content = create_and_put_object(\n",
    "    client=client, bck_name=BUCKET_NAME, obj_name=\"object-demo.jpg\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "### Get single object with ETL code transformation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "client.bucket(BUCKET_NAME).object(\"object-demo.jpg\").get_reader(\n",
    "    etl_name=md5_code_etl.name\n",
    ").read_all()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "### Get single object with ETL spec transformation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "client.bucket(BUCKET_NAME).object(\"object-demo.jpg\").get_reader(\n",
    "    etl_name=md5_spec_etl.name\n",
    ").read_all()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "52656fc1",
   "metadata": {},
   "source": [
    "## Transform entire bucket with ETL"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6760478f",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create bucket to store transformed objects\n",
    "dest_bucket = client.bucket(\"transform-destination-bucket\").create(exist_ok=True)\n",
    "\n",
    "# Transform bucket contents (with on-the-fly object renames)\n",
    "client.bucket(BUCKET_NAME).transform(\n",
    "    etl_name=md5_spec_etl.name,\n",
    "    to_bck=dest_bucket,\n",
    "    prepend=\"transformed-\",\n",
    "    ext={\"jpg\": \"txt\"},\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "db8ccf1a",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Verify rename operations for transformed objects\n",
    "dest_bucket.list_objects().get_entries()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a1a2e8ae",
   "metadata": {},
   "source": [
    "### Stop ETLs\n",
    "If an ETL is stopped, any Kubernetes pods created for the ETL are *stopped*, but *not deleted*. Any transforms by the stopped ETL are terminated. Stopped ETLs can be resumed for use with method `start()`:\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b7ab064f",
   "metadata": {},
   "outputs": [],
   "source": [
    "md5_code_etl.stop()\n",
    "md5_spec_etl.stop()\n",
    "client.cluster().list_running_etls()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9beb3d0f",
   "metadata": {},
   "source": [
    "### Restart Stopped ETLs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cea3c373",
   "metadata": {},
   "outputs": [],
   "source": [
    "md5_code_etl.start()\n",
    "md5_spec_etl.start()\n",
    "client.cluster().list_running_etls()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e1fb0a93",
   "metadata": {},
   "source": [
    "### Stop & Delete ETLs\n",
    "Once completely finished with the ETLs, we clean up (for storage) by stopping the ETLs with `stop` and subsequently deleting the ETLs with `delete`.\n",
    "Deleting an ETL deletes all pods created by Kubernetes for the ETL as well as any specifications for the ETL on Kubernetes. Consequently, deleted ETLs cannot be started again and will need to be re-initialized."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bc33c20e",
   "metadata": {},
   "outputs": [],
   "source": [
    "md5_code_etl.stop()\n",
    "md5_spec_etl.stop()\n",
    "\n",
    "md5_code_etl.delete()\n",
    "md5_spec_etl.delete()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7aaf1c52",
   "metadata": {},
   "source": [
    "### Starting Deleted ETL Raises Exception"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cf2a938a",
   "metadata": {},
   "outputs": [],
   "source": [
    "md5_code_etl.start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "02fa415c",
   "metadata": {},
   "outputs": [],
   "source": [
    "md5_spec_etl.start()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "278ecb98",
   "metadata": {},
   "source": [
    "### Initialize ETL XOR+Checksum with streaming data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "97214ac4",
   "metadata": {},
   "outputs": [],
   "source": [
    "content = create_and_put_object(\n",
    "    client=client, bck_name=BUCKET_NAME, obj_name=\"object-xor-demo.jpg\", obj_size=256\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "92cce61e",
   "metadata": {},
   "outputs": [],
   "source": [
    "class XORChecksumETL(FastAPIServer):\n",
    "    def transform(self, data: bytes, _path: str, _etl_args: str) -> bytes:\n",
    "        \"\"\"Transform data using XOR with key and append MD5 checksum\"\"\"\n",
    "        checksum = hashlib.md5()\n",
    "        key = b\"AISTORE\"\n",
    "        result = bytearray()\n",
    "        \n",
    "        # Process data in chunks for streaming-like behavior\n",
    "        for i in range(0, len(data), 32):\n",
    "            chunk = data[i:i+32]\n",
    "            out = bytes([_a ^ _b for _a, _b in zip(chunk, cycle(key))])\n",
    "            result.extend(out)\n",
    "            checksum.update(out)\n",
    "        \n",
    "        # Append checksum\n",
    "        result.extend(checksum.hexdigest().encode())\n",
    "        return bytes(result)\n",
    "\n",
    "xor_stream_etl = client.etl(\"xor-md5-stream\")\n",
    "xor_stream_etl.init_class()(XORChecksumETL)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "### Get object with XOR+Checksum ETL and verify checksum"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "xor_obj = (\n",
    "    client.bucket(BUCKET_NAME)\n",
    "    .object(\"object-xor-demo.jpg\")\n",
    "    .get_reader(etl_name=xor_stream_etl.name)\n",
    "    .read_all()\n",
    ")\n",
    "data, checksum = xor_obj[:-32], xor_obj[-32:]\n",
    "computed_checksum = hashlib.md5(data).hexdigest().encode()\n",
    "computed_checksum == checksum"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "23ac67da",
   "metadata": {},
   "outputs": [],
   "source": [
    "xor_stream_etl.stop()\n",
    "xor_stream_etl.delete()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "### Cleanup buckets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "for bucket in client.cluster().list_buckets():\n",
    "    client.bucket(bucket.name).delete()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.6 (main, Nov 14 2022, 16:10:14) [GCC 11.3.0]"
  },
  "vscode": {
   "interpreter": {
    "hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
